#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Attributes:
    __version__ (str): "0.0.1"
    __copyright__ (str): "Copyright (C) 2025 z"
    __license__ (str): "MIT"
    __author__ (str): "Z"
"""

import json
import time
import uuid
import datetime

from kafka import KafkaProducer
from kafka.errors import KafkaError

def test(topic):
    print('begin')
    try:
        n = 0
        while True:
            dic = {}
            dic['id'] = n
            n = n + 1
            dic['uuid'] = str(uuid.uuid4().hex)
            dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S")
            producer.send(topic, json.dumps(dic).encode())
            print("send:" + json.dumps(dic))
            time.sleep(0.5)
    except KafkaError as e:
        print(e)
    finally:
        producer.close()
        print('done')


if __name__ == '__main__':
    # 连接到Kafka集群，指定topic
    producer = KafkaProducer(bootstrap_servers=['172.29.126.221:9092', '172.19.149.222:9092', '172.29.126.223:9092'])
    topic = 'test_20181105'
    test(topic)
